{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Crawler for The Internet Movie Script Database (IMSDb)\n",
    "\n",
    "Downloads and parses data from https://imsdb.com/ and optionally from other transcript sources.\n",
    "Raw transcripts where dialogue is denoted by the positioning of blocks of text is processed and turned into a dialogue text format where speakers are denoted as \\[person\\] with bracets.\n",
    "\n",
    "Note that transcripts with invalid character encoding will be discarded as they would add noise to the tokens. \n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/LAION-AI/Open-Assistant/blob/data/datasets/tv_dialogue/imsdb.ipynb)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# uncomment and run below lines to set up if running in colab\n",
    "# !git clone https://github.com/LAION-AI/Open-Assistant.git\n",
    "# %cd Open-Assistant/data/datasets/tv_dialogue\n",
    "# !pip install -r requirements.txt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "# global settings\n",
    "\n",
    "FOLDER = \"srt\"  # save raw transcripts here\n",
    "STATUS = \"crawled.csv\"  # save the list of downloaded files and their status into this csv"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "# import required packages\n",
    "import os\n",
    "import io\n",
    "import re\n",
    "import requests\n",
    "import time\n",
    "import warnings\n",
    "\n",
    "try:\n",
    "    from BeautifulSoup import BeautifulSoup\n",
    "except ImportError:\n",
    "    from bs4 import BeautifulSoup\n",
    "from tqdm import tqdm\n",
    "\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "\n",
    "from typing import Tuple, Optional, Any"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "class IMSDbCrawler:\n",
    "    HEADER = {\n",
    "        \"User-Agent\": \"Mozilla/5.0 (compatible; TranscriptCrawler/0.2)\",\n",
    "    }\n",
    "    TIMER = 300  # wait ms between calls\n",
    "    WEBSITE = \"https://imsdb.com/\"\n",
    "\n",
    "    def __init__(self, folder: Optional[str] = None) -> None:\n",
    "        self.folder = folder\n",
    "        if self.folder is not None:\n",
    "            os.makedirs(self.folder, exist_ok=True)\n",
    "        self.last_call = 0\n",
    "\n",
    "    def _get(self, url: str, allow_unicode_errors: bool = True) -> Optional[str]:\n",
    "        diff = max(0.0, self.TIMER - (time.time() - self.last_call))\n",
    "        if diff:\n",
    "            time.sleep(diff / 1000.0)\n",
    "        data = requests.get(url, headers=self.HEADER)\n",
    "        self.last_call = time.time()\n",
    "        if data.status_code == 404:\n",
    "            return None\n",
    "\n",
    "        try:\n",
    "            return data.content.decode(\"utf-8\")\n",
    "        except UnicodeDecodeError:\n",
    "            try:\n",
    "                return data.content.decode(\"ISO-8859-1\")  # latin-1\n",
    "            except UnicodeDecodeError:\n",
    "                if allow_unicode_errors:\n",
    "                    return data.content.decode(\"utf-8\", \"backslashreplace\")\n",
    "                else:\n",
    "                    return None\n",
    "\n",
    "    def get_catalog(self) -> pd.DataFrame:\n",
    "        # create catalog of all movies from the contents of the site\n",
    "        movies = {\"alpha\": [], \"title\": [], \"link\": []}\n",
    "        alpha = [f\"alphabetical/{alpha}\" for alpha in [\"0\"] + [chr(i) for i in range(ord(\"A\"), ord(\"Z\") + 1)]]\n",
    "        tv = [\"TV/South%20Park.html\", \"TV/Stargate%20SG1.html\"]  # \"TV/Futurama.html\", \"TV/Seinfeld.html\", \"TV/Lost\"\n",
    "        for slug in tqdm(alpha + tv):\n",
    "            html = self._get(f\"https://imsdb.com/{slug}\")\n",
    "            if html is None:\n",
    "                continue\n",
    "            dom = BeautifulSoup(html, \"html.parser\")\n",
    "            for movie in dom.select(\"table\")[1].select(\"tr\")[0].select(\"td\")[-1].select(\"a\"):\n",
    "                movies[\"alpha\"].append(slug.split(\"/\")[1].split(\".html\")[0])\n",
    "                movies[\"title\"].append(movie.string)\n",
    "                movies[\"link\"].append(movie[\"href\"])\n",
    "        movies = pd.DataFrame(movies)\n",
    "        movies[\"status\"] = np.nan\n",
    "        return movies\n",
    "\n",
    "    def download(self, url: str) -> Optional[str]:\n",
    "        # get the script url from the movie's page\n",
    "        html = self._get(f\"{self.WEBSITE}{url}\")\n",
    "        if html is None:\n",
    "            return None\n",
    "        dom = BeautifulSoup(html, \"html.parser\")\n",
    "        for a in dom.find(\"table\", {\"class\": \"script-details\"}).select(\"a\"):\n",
    "            if \"scripts/\" in a[\"href\"] and \".html\" in a[\"href\"]:\n",
    "                script = a[\"href\"] if \"http\" in a[\"href\"] else f\"{self.WEBSITE}{a['href'].lstrip('/')}\"\n",
    "                return self._get(script, allow_unicode_errors=False)\n",
    "        return None\n",
    "\n",
    "    def _clean_dom(self, html: str):\n",
    "        dom = BeautifulSoup(html, \"html.parser\")\n",
    "        if dom.find(\"div\", {\"id\": \"content\"}):\n",
    "            dom = dom.find(\"div\", {\"id\": \"content\"})\n",
    "        if dom.find(\"td\", {\"class\": \"scrtext\"}):\n",
    "            dom = dom.find(\"td\", {\"class\": \"scrtext\"})\n",
    "        if dom.find(\"pre\"):\n",
    "            dom = dom.find(\"pre\")\n",
    "        for s in dom.select(\"script\"):\n",
    "            s.extract()\n",
    "        for a in dom.select(\"a\"):\n",
    "            a.extract()\n",
    "        return dom\n",
    "\n",
    "    def is_person(self, speaker: str) -> bool:\n",
    "        if len(speaker) <= 1:\n",
    "            return False\n",
    "        elif \"!\" in speaker or \"?\" in speaker or \"...\" in speaker:\n",
    "            return False\n",
    "        elif speaker[0] == \"-\" or \"--\" in speaker:\n",
    "            return False\n",
    "        elif speaker[0].isnumeric():\n",
    "            return False\n",
    "        elif speaker[0].isalpha() and speaker[0].islower():\n",
    "            return False\n",
    "        elif speaker[0] == \"(\" and speaker[-1] == \")\":\n",
    "            return False\n",
    "        elif speaker[0] == '\"' and speaker[-1] == '\"':\n",
    "            return False\n",
    "        elif speaker.count('\"') % 2 != 0 or speaker.count(\"(\") != speaker.count(\")\"):\n",
    "            return False\n",
    "        elif re.findall(r\"\\b(FADES?|CUTS? TO|MUSIC)\\b\", speaker):\n",
    "            return False\n",
    "        return True\n",
    "\n",
    "    def parse(self, html) -> str:\n",
    "        dom = self._clean_dom(html)\n",
    "\n",
    "        minlines = np.inf\n",
    "        for line in dom.text.splitlines():\n",
    "            match = re.findall(r\"(\\s*)(\\S.*)(?:\\r?\\n)*\", line)\n",
    "            if match:\n",
    "                minlines = min(minlines, len(match[0][0]))\n",
    "\n",
    "        text = \"\"\n",
    "        speaker = \"\"\n",
    "        last = minlines\n",
    "        for line in dom.text.splitlines():\n",
    "            match = re.findall(r\"(\\s*)(\\S.*)(?:\\r?\\n)*\", line)\n",
    "            if match:\n",
    "                n = len(match[0][0])\n",
    "                script = match[0][1].strip()\n",
    "                if script[0] == \"[\" and script[-1] == \"]\":\n",
    "                    script = f\"({script[1:-1]})\"\n",
    "                if n == minlines:\n",
    "                    if speaker:\n",
    "                        text += f\"{speaker}\\r\\n{script}\\r\\n\"\n",
    "                        speaker = \"\"\n",
    "                    else:\n",
    "                        text += f\"{script}\\r\\n\"\n",
    "                    last = n\n",
    "                else:\n",
    "                    if n >= last:\n",
    "                        if speaker:\n",
    "                            text += f\"{speaker}\\r\\n\"\n",
    "                            speaker = \"\"\n",
    "                        if n > last:\n",
    "                            speaker = script\n",
    "                            last = n\n",
    "                        else:\n",
    "                            text += f\"{script}\\r\\n\"\n",
    "                    else:\n",
    "                        if speaker:\n",
    "                            if not self.is_person(speaker):\n",
    "                                text += f\"{speaker}\\r\\n{script}\\r\\n\"\n",
    "                            else:\n",
    "                                if speaker[-1] == \":\":\n",
    "                                    speaker = speaker[:-1]\n",
    "                                text += f\"[{speaker}] {script}\\r\\n\"\n",
    "                            speaker = \"\"\n",
    "                        else:\n",
    "                            text += f\"{script}\\r\\n\"\n",
    "                    try:\n",
    "                        script = float(script.strip())\n",
    "                        last = minlines\n",
    "                    except ValueError:\n",
    "                        last = n\n",
    "            else:\n",
    "                text += \"\\r\\n\"\n",
    "        if speaker:\n",
    "            text += f\"{speaker}\\r\\n\"\n",
    "        if not re.findall(r\"\\[.+?\\] .+?\\r\\n\\r\\n\\[.+?\\] .+?\\r\\n\\r\\n\", text):\n",
    "            return \"\"\n",
    "        first_occurrence = re.findall(r\"\\[.+?\\] \", text)[0]\n",
    "        if len(re.findall(re.escape(first_occurrence), text)) == 1:\n",
    "            text = re.sub(re.escape(first_occurrence), f\"{first_occurrence[1:-2]}\\r\\n\", text)\n",
    "\n",
    "        text = text.replace(\"&amp;\", \"&\")\n",
    "        text = \"\\r\\n\".join(text.splitlines())\n",
    "        text = re.sub(r\"(\\r*\\n)\", \"\\n\", text)\n",
    "        text = re.sub(r\"\\n{2,}\", \"\\n\\n\", text).strip()\n",
    "\n",
    "        return text\n",
    "\n",
    "    def _write(self, file: str, content: str) -> None:\n",
    "        path = os.path.join(self.folder, file) if self.folder is not None else file\n",
    "        with open(path, \"w+\", newline=None, encoding=\"utf-8\") as f:\n",
    "            f.write(content)\n",
    "\n",
    "    def save(self, url: str) -> bool:\n",
    "        html = self.download(url)\n",
    "        if not html:\n",
    "            return False\n",
    "        script = self.parse(html)\n",
    "        if len(script) < 128:\n",
    "            return False\n",
    "\n",
    "        name = \".\".join(url.split(\".\")[:-1]) if \".\" in url[-5:] else url\n",
    "        name = \"\".join([c for c in name.replace(\" \", \"_\").replace(\"/\", \"-\") if c.isalnum() or c in (\"-\", \"_\")])\n",
    "        self._write(f\"{name}.txt\", script)\n",
    "        return True"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "ic = IMSDbCrawler(FOLDER)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 88,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|████████████████████████████████████████████████████████████████████████████████████| 3/3 [00:01<00:00,  2.00it/s]\n"
     ]
    }
   ],
   "source": [
    "catalog = ic.get_catalog()\n",
    "catalog.to_csv(STATUS, index=False)\n",
    "catalog"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Done.\n"
     ]
    }
   ],
   "source": [
    "# NOTE: this will take really long\n",
    "catalog = pd.read_csv(STATUS)\n",
    "crawled = catalog.copy()\n",
    "for index, row in catalog.iterrows():\n",
    "    if pd.isna(row[\"status\"]):\n",
    "        t = time.time()\n",
    "        print(f\"{row['alpha']} {row['title']}\", end=\" \")\n",
    "        if ic.save(row[\"link\"]):\n",
    "            print(\"✔️\", end=\" \")\n",
    "            crawled.at[index, \"status\"] = 1.0\n",
    "        else:\n",
    "            print(\"❌\", end=\" \")\n",
    "            crawled.at[index, \"status\"] = 0.0\n",
    "        print(f\"- {(time.time() - t):.3f}s\")\n",
    "        crawled.to_csv(STATUS, index=False)\n",
    "        if pd.notna(crawled[\"status\"]).sum() % 25 == 0:\n",
    "            print(\n",
    "                f\"▶▶▶ {pd.notna(crawled['status']).sum()} done ({int(crawled['status'].sum())} successful) out of {len(crawled)} ◀◀◀\"\n",
    "            )\n",
    "print(\"Done.\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
